package main

import (
	"bytes"
	"context"
	"encoding/json"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/log/level"
	"github.com/olivere/elastic/v7"
	"io/ioutil"
	"net/http"
	"os"
	"strings"
	"time"
)

type Member struct {
	Id                   string     `json:"id,omitempty"`
	NickName             string     `json:"nickname,omitempty"`
	Age                  int64      `json:"age,omitempty"`
	Sex                  string     `json:"sex,omitempty"`
	Avatar               string     `json:"avatar"`
	CollectorId          string     `json:"collectorId,omitempty"`
	CollectorOrgan       string     `json:"collectorOrgan,omitempty"`
	CaptureTimestamp     int64      `json:"captureTimestamp,omitempty"`
	UsedNickname         []string   `json:"usedNickname,omitempty"`
	DataSource           string     `json:"dataSource,omitempty"`
	MessageCount7        int64      `json:"messageCount7,omitempty"`
	AlarmCount7          int64      `json:"alarmCount7,omitempty"`
	Watched              bool       `json:"watched,omitempty"`
	Tags                 []string   `json:"tags,omitempty"`
	Signature            string     `json:"signature"`
	Comment              string     `json:"comment,omitempty"`
	UsedName             []string   `json:"usedName,omitempty"`
	Chatrooms            []Chatroom `json:"chatrooms,omitempty"`
	Name                 string     `json:"name,omitempty"`
	Email                string     `json:"email,omitempty"`
	PhoneNumber          []string   `json:"phoneNumber,omitempty"`
	IdCard               string     `json:"idCard,omitempty"`
	IdCardAvatar         string     `json:"idCardAvatar,omitempty"`
	Address              []string   `json:"address"`
	ResidenceAddress     string     `json:"residenceAddress,omitempty"` //常住地
	CensusAddress        string     `json:"censusAddress,omitempty"`    //户籍
	UsedSignature        []string   `json:"usedSignature,omitempty"`
	Aliases              []string   `json:"aliases,omitempty"`              //群内昵称
	LastMessageTimestamp int64      `json:"lastMessageTimestamp,omitempty"` //最后发言时间
}

type Chatroom struct {
	Id               string    `json:"id,omitempty"`
	Name             string    `json:"name,omitempty"`
	Owner            Member    `json:"owner,omitempty"`
	Avatar           string    `json:"avatar"`
	CollectorId      string    `json:"collectorId,omitempty"`
	CollectorOrgan   string    `json:"collectorOrgan,omitempty"`
	CaptureTimestamp int64     `json:"captureTimestamp,omitempty"`
	DataSource       string    `json:"dataSource,omitempty"`
	MessageCount     int64     `json:"messageCount,omitempty"`
	Watched          bool      `json:"watched,omitempty"`
	Tags             []string  `json:"tags,omitempty"`
	Category         string    `json:"category,omitempty"`
	Comment          string    `json:"comment,omitempty"`
	UsedName         []string  `json:"usedName,omitempty"`
	Members          []*Member `json:"members,omitempty"`
}

type Message struct {
	Id               string   `json:"id,omitempty"`
	Type             string   `json:"type,omitempty"`
	Content          string   `json:"content,omitempty"`
	FilePath         string   `json:"filePath,omitempty"`
	Timestamp        int64    `json:"timestamp,omitempty"`
	CollectorId      string   `json:"collectorId,omitempty"`
	CollectorOrgan   string   `json:"collectorOrgan,omitempty"`
	CaptureTimestamp int64    `json:"captureTimestamp,omitempty"`
	DataSource       string   `json:"dataSource,omitempty"`
	Watched          bool     `json:"watched,omitempty"`
	Topics           []Topic  `json:"topics,omitempty"`
	Member           Member   `json:"member,omitempty"`
	Chatroom         Chatroom `json:"chatroom,omitempty"`
	Score            float64  `json:"score,omitempty"`
}

type Topic struct {
	Id           string   `json:"id"`
	ParentId     string   `json:"parentId,omitempty"`
	IsDefault    bool     `json:"isDefault,omitempty"`
	KeywordScore float64  `json:"keywordScore,omitempty"`
	Keywords     []string `json:"keywords,omitempty"`
	Name         string   `json:"name,omitempty"`
	Score        float64  `json:"score,omitempty"`
}

type OffsetMessage struct {
	Message
	Offset    *int64  `json:"offset"`
	Topic     *string `json:"topic"`
	Partition *int32  `json:"partition"`
}

type ESMessage struct {
	OffsetMessage
	AdvertisingFlag  *bool    `json:"advertisingFlag"`
	AdvertisingScore *float64 `json:"advertisingScore"`
}

type MessageInfo struct {
	Offset    *int64
	Topic     *string
	Partition *int32
}

type RequestData struct {
	Data *[]*OffsetMessage `json:"data"`
}

type ResponseData struct {
	Data  *[]*ESMessage `json:"data"`
	Error *int          `json:"err"`
}

type workerMessageRepository struct {
	logger            log.Logger
	MessagesChan      chan *OffsetMessage
	ESMessageChan     chan *ESMessage
	RequestFailedChan chan *bytes.Reader
	ACKChan           chan *MessageInfo
}

type esMessageRepository struct {
	logger            log.Logger
	ElasticClient     *elastic.Client
	ESMessageChan     chan *ESMessage
	ACKChan           chan *MessageInfo
	alarmMessageIndex string
}

func NewRequests() []elastic.BulkableRequest {
	return make([]elastic.BulkableRequest, 0, 1000)
}

func NewMessageInfos() []*MessageInfo {
	return make([]*MessageInfo, 0, 1000)
}

func NewWorkerMessageRepository(logger log.Logger, messageChan chan *OffsetMessage, esMessageChan chan *ESMessage, ackChan chan *MessageInfo) *workerMessageRepository {
	return &workerMessageRepository{
		logger:            logger,
		MessagesChan:      messageChan,
		ESMessageChan:     esMessageChan,
		RequestFailedChan: make(chan *bytes.Reader, 20),
		ACKChan:           ackChan,
	}
}

func NewESMessageRepository(logger log.Logger, elasticClient *elastic.Client, esMessageChan chan *ESMessage, ackChan chan *MessageInfo, alarmMessageIndex string) *esMessageRepository {
	return &esMessageRepository{
		logger:            logger,
		ElasticClient:     elasticClient,
		ESMessageChan:     esMessageChan,
		ACKChan:           ackChan,
		alarmMessageIndex: alarmMessageIndex,
	}
}

// 将数据交由worker处理并返回
func (wr *workerMessageRepository) ParseMessage(API string) {
	for {
		body := wr.GetBody()
		resp, err := http.Post(API, "application/json", body)
		if err != nil {
			_ = level.Error(wr.logger).Log("request", err)
			resp = wr.ReRequest(API, body)
			if resp == nil {
				_ = wr.logger.Log("workerClose", strings.Split(API, "/api/v1/task")[0])
				return
			}
		}
		if resp.StatusCode != 200 {
			// API 或 server 有问题，直接删除该worker节点
			_ = wr.logger.Log("workerClose", strings.Split(API, "/api/v1/task")[0])
			wr.RequestFailedChan <- body
			_ = resp.Body.Close()
			return
		}
		res, _ := ioutil.ReadAll(resp.Body)
		_ = resp.Body.Close()
		respData := &ResponseData{}
		_ = json.Unmarshal(res, respData)
		if respData.Error != nil {
			_ = level.Error(wr.logger).Log("response", *respData.Error)
			switch {
			case *respData.Error == 1:
				for _, msgInfo := range *respData.Data {
					wr.ACKChan <- &MessageInfo{
						Offset:    msgInfo.Offset,
						Topic:     msgInfo.Topic,
						Partition: msgInfo.Partition,
					}
				}
				continue
			case *respData.Error == 2:
				ok := wr.ReGetResponse(API, body)
				if ok {
					continue
				} else {
					_ = wr.logger.Log("workerClose", strings.Split(API, "/api/v1/test"))
					return
				}
			}
		}
		for _, esMessage := range *respData.Data {
			wr.ESMessageChan <- esMessage
		}
	}
}

func (wr *workerMessageRepository) SliceControl(s []*OffsetMessage) []*OffsetMessage {
	timer := time.NewTimer(WaitTime)
	for {
		if len(s) >= 1000 {
			return s
		}
		select {
		case <-timer.C:
			if len(s) == 0 {
				timer = time.NewTimer(WaitTime)
				continue
			}
			return s
		default:
			select {
			case msg := <-wr.MessagesChan:
				s = append(s, msg)
			default:
				time.Sleep(time.Second * 1)
			}
		}
	}
}

func (wr *workerMessageRepository) GetBody() (body *bytes.Reader) {
	select {
	case body = <-wr.RequestFailedChan:
	default:
		var parseDataList []*OffsetMessage
		parseDataList = wr.SliceControl(parseDataList)
		reqData, _ := json.Marshal(&RequestData{Data: &parseDataList})
		body = bytes.NewReader(reqData)
	}
	return body
}

func (wr *workerMessageRepository) ReRequest(API string, body *bytes.Reader) *http.Response {
	reRequestCount := 0
	for {
		if reRequestCount > RetryCount {
			wr.RequestFailedChan <- body
			// TODO
			_ = level.Error(wr.logger).Log("reRequest", "fail")
			return nil
		}
		reRequestCount += 1
		_ = wr.logger.Log("reRequest", true, "count", reRequestCount)
		resp, err := http.Post(API, "application/json", body)
		if err != nil {
			// TODO
			_ = level.Error(wr.logger).Log("reRequest", err, "count", reRequestCount)
			continue
		}
		return resp
	}
}

func (wr *workerMessageRepository) ReGetResponse(API string, body *bytes.Reader) bool {
	reGetResponseCount := 0
	for {
		if reGetResponseCount > RetryCount {
			wr.RequestFailedChan <- body
			return false
		}
		reGetResponseCount += 1
		_ = wr.logger.Log("reGetResponse", true, "count", reGetResponseCount)
		resp, err := http.Post(API, "application/json", body)
		if err != nil {
			_ = level.Error(wr.logger).Log("reGetResponse", err)
			continue
		}
		if resp.StatusCode != 200 {
			wr.RequestFailedChan <- body
			return false
		}
		respData := &ResponseData{}
		res, _ := ioutil.ReadAll(resp.Body)
		_ = json.Unmarshal(res, respData)
		if respData.Error != nil {
			_ = level.Error(wr.logger).Log("response", err)
			switch {
			case *respData.Error == 1:
				for _, msgInfo := range *respData.Data {
					wr.ACKChan <- &MessageInfo{
						Offset:    msgInfo.Offset,
						Topic:     msgInfo.Topic,
						Partition: msgInfo.Partition,
					}
				}
				return true
			case *respData.Error == 2:
				continue
			}
		} else {
			for _, esMessage := range *respData.Data {
				wr.ESMessageChan <- esMessage
			}
			return true
		}
		_ = resp.Body.Close()
	}
}

func (esr *esMessageRepository) SaveMessage() {
	_ = esr.logger.Log("")
	for {
		msgInfos := NewMessageInfos()
		requests := NewRequests()
		msgInfos = esr.RequestsControl(requests, msgInfos)
		for _, msgInfo := range msgInfos {
			esr.ACKChan <- msgInfo
		}
	}
}

func (esr *esMessageRepository) RequestsControl(r []elastic.BulkableRequest, ms []*MessageInfo) []*MessageInfo {
	timer := time.NewTimer(WaitTime)
	for {
		if len(r) >= 1000 {
			_, err := esr.ElasticClient.Bulk().Add(r...).Do(context.Background())
			if err != nil {
				_ = level.Error(esr.logger).Log("elasticSave", err)
				esr.ReSaveMessage(r)
			}
			_ = esr.logger.Log("saveMessage", len(ms))
			return ms
		}
		select {
		case <-timer.C:
			if len(r) == 0 {
				timer = time.NewTimer(WaitTime)
				continue
			}
			_, err := esr.ElasticClient.Bulk().Add(r...).Do(context.Background())
			if err != nil {
				_ = level.Error(esr.logger).Log("elasticSave", err)
				esr.ReSaveMessage(r)
			}
			_ = esr.logger.Log("saveMessage", len(r))
			return ms
		default:
			select {
			case esMessage := <-esr.ESMessageChan:
				if len(esMessage.Topics) > 0 {
					r = append(
						r,
						elastic.
							NewBulkUpdateRequest().
							Index(esr.alarmMessageIndex).
							Id(esMessage.Id).
							DocAsUpsert(true).
							Doc(esMessage),
					)
				}
				ms = append(
					ms,
					&MessageInfo{
						Offset:    esMessage.Offset,
						Topic:     esMessage.Topic,
						Partition: esMessage.Partition,
					},
				)
			default:
				time.Sleep(time.Second * 1)
			}
		}
	}
}

func (esr *esMessageRepository) ReSaveMessage(r []elastic.BulkableRequest) {
	if len(r) == 0 {
		return
	}
	reSaveMessageCount := 0
	for {
		if reSaveMessageCount%RetryCount == 0 {
			res, err := esr.ElasticClient.CatHealth().Do(context.Background())
			if err != nil {
				_ = level.Error(esr.logger).Log("reSaveMessage", err, "reSaveCount", reSaveMessageCount)
				continue
			}
			if res[0].Status == "red" {
				_ = level.Error(esr.logger).Log("esServerFail", "red")
				os.Exit(1)
			}
			time.Sleep(time.Second * 2)
		}
		_, err := esr.ElasticClient.Bulk().Add(r...).Do(context.Background())
		if err != nil {
			_ = level.Error(esr.logger).Log("reSaveMessage", err, "reSaveCount", reSaveMessageCount)
			reSaveMessageCount += 1
			continue
		}
		_ = esr.logger.Log("reSaveMessage", len(r))
		return
	}
}
